// SPDX-FileCopyrightText: Copyright 2025-2025 深圳市同心圆网络有限公司
// SPDX-License-Identifier: GPL-3.0-only

package service_cmd

import (
	"context"
	"errors"
	"fmt"
	"net"
	"os"
	"os/signal"
	"syscall"

	"gitcode.com/opendragonfly/df_proto_gen_go.git/config_api"
	"gitcode.com/opendragonfly/df_proto_gen_go.git/member_api"
	"gitcode.com/opendragonfly/df_proto_gen_go.git/trace_api"
	"gitcode.com/opendragonfly/df_server/api_impl/config_api_impl"
	"gitcode.com/opendragonfly/df_server/api_impl/member_api_impl"
	"gitcode.com/opendragonfly/df_server/api_impl/trace_api_impl"
	"gitcode.com/opendragonfly/df_server/config"
	"gitcode.com/opendragonfly/df_server/dao"
	"gitcode.com/opendragonfly/df_server/dao/trace_dao"
	"gitcode.com/opendragonfly/df_server/utils"
	grpc_middleware "github.com/grpc-ecosystem/go-grpc-middleware"
	grpc_recovery "github.com/grpc-ecosystem/go-grpc-middleware/recovery"
	"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/jaegerreceiver"
	"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/skywalkingreceiver"
	"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver"
	"github.com/spf13/cobra"
	"go.opentelemetry.io/collector/component"
	"go.opentelemetry.io/collector/config/configauth"
	"go.opentelemetry.io/collector/config/confighttp"
	"go.opentelemetry.io/collector/config/configtelemetry"
	"go.opentelemetry.io/collector/consumer"
	"go.opentelemetry.io/collector/pdata/pcommon"
	"go.opentelemetry.io/collector/pdata/ptrace"
	"go.opentelemetry.io/collector/receiver"
	"go.opentelemetry.io/collector/receiver/otlpreceiver"
	noopmetric "go.opentelemetry.io/otel/metric/noop"
	nooptrace "go.opentelemetry.io/otel/trace/noop"
	"go.uber.org/zap"
	"google.golang.org/grpc"
)

var runCmd = &cobra.Command{
	Use:  "run",
	RunE: doRunService,
}

func doRunService(cmd *cobra.Command, args []string) error {
	cfg, err := config.ReadServerConfigFromCache()
	if err != nil {
		return err
	}
	//初始化数据库
	err = dao.Init(cfg.KeepTraceDay)
	if err != nil {
		return err
	}
	defer dao.Close()

	err = processPidFile()
	if err != nil {
		return err
	}

	defer os.Remove(utils.PID_FILE_PATH)

	exitChan := make(chan error, 10)
	if cfg.Port.EnableZipkin {
		startZipkinReciver(cfg.Port, exitChan)
	}
	if cfg.Port.EnableJaegerGrpc || cfg.Port.EnableJaegerHttp {
		startJaegerReciver(cfg.Port, exitChan)
	}
	if cfg.Port.EnableSkywalkingGrpc || cfg.Port.EnableSkywalkingHttp {
		startSkywalkingReciver(cfg.Port, exitChan)
	}
	if cfg.Port.EnableOtlpGrpc || cfg.Port.EnableOtlpHttp {
		startOtlpReciver(cfg.Port, exitChan)
	}

	go runService(cfg.Port.ServicePort, exitChan)
	go func() {
		sigChan := make(chan os.Signal, 1)
		signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGHUP)
		for {
			sig := <-sigChan
			if sig == syscall.SIGHUP {
				config.MarkServerConfigChange()
			} else {
				exitChan <- errors.New("catch signal")
			}
		}
	}()

	for {
		err = <-exitChan
		if err != nil {
			return err
		}
	}
}

func processPidFile() error {
	oldPid, err := utils.LoadPid()
	if err == nil { //可能存在老进程
		p, _ := os.FindProcess(oldPid) //在linux上总是成功返回
		err = p.Signal(syscall.Signal(0))
		if err == nil { //存在老进程
			return fmt.Errorf("allread running")
		}
		os.Remove(utils.PID_FILE_PATH)
	}
	return utils.StorePid()
}

func createSettings() receiver.Settings {
	return receiver.Settings{
		TelemetrySettings: component.TelemetrySettings{
			Logger:         zap.NewNop(),
			TracerProvider: nooptrace.NewTracerProvider(),
			MeterProvider:  noopmetric.NewMeterProvider(),
			MetricsLevel:   configtelemetry.LevelNone,
			Resource:       pcommon.NewResource(),
		},
		BuildInfo: component.NewDefaultBuildInfo(),
	}
}

func startZipkinReciver(portCfg config.PortConfig, exitChan chan<- error) {
	factory := zipkinreceiver.NewFactory()
	recvCfg := factory.CreateDefaultConfig().(*zipkinreceiver.Config)
	recvCfg.Endpoint = fmt.Sprintf("0.0.0.0:%d", portCfg.ZipkinPort)
	recvCfg.Auth = &confighttp.AuthConfig{
		Authentication: configauth.Authentication{
			AuthenticatorID: AuthComponentId,
		},
	}

	setting := createSettings()

	tracer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
		return trace_dao.TraceDao.ConsumeTraces(ctx, td)
	})
	if err != nil {
		exitChan <- err
		return
	}

	recvImpl, err := factory.CreateTraces(context.Background(), setting, recvCfg, tracer)
	if err != nil {
		exitChan <- err
		return
	}
	err = recvImpl.Start(context.Background(), &myHost{})
	if err != nil {
		exitChan <- err
		return
	}
}

func startJaegerReciver(portCfg config.PortConfig, exitChan chan<- error) {
	factory := jaegerreceiver.NewFactory()
	recvCfg := factory.CreateDefaultConfig().(*jaegerreceiver.Config)
	if portCfg.EnableJaegerGrpc {
		recvCfg.Protocols.GRPC.NetAddr.Endpoint = fmt.Sprintf("0.0.0.0:%d", portCfg.JaegerGrpcPort)
		recvCfg.Protocols.GRPC.Auth = &configauth.Authentication{
			AuthenticatorID: AuthComponentId,
		}
	}
	if portCfg.EnableJaegerHttp {
		recvCfg.Protocols.ThriftHTTP.Endpoint = fmt.Sprintf("0.0.0.0:%d", portCfg.JaegerHttpPort)
		recvCfg.Protocols.ThriftHTTP.Auth = &confighttp.AuthConfig{
			Authentication: configauth.Authentication{
				AuthenticatorID: AuthComponentId,
			},
		}
	}

	setting := createSettings()

	tracer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
		return trace_dao.TraceDao.ConsumeTraces(ctx, td)
	})
	if err != nil {
		exitChan <- err
		return
	}

	recvImpl, err := factory.CreateTraces(context.Background(), setting, recvCfg, tracer)
	if err != nil {
		exitChan <- err
		return
	}
	err = recvImpl.Start(context.Background(), &myHost{})
	if err != nil {
		exitChan <- err
		return
	}
}

func startSkywalkingReciver(portCfg config.PortConfig, exitChan chan<- error) {
	factory := skywalkingreceiver.NewFactory()
	recvCfg := factory.CreateDefaultConfig().(*skywalkingreceiver.Config)
	if portCfg.EnableSkywalkingGrpc {
		recvCfg.Protocols.GRPC.NetAddr.Endpoint = fmt.Sprintf("0.0.0.0:%d", portCfg.SkywalkingGrpcPort)
		recvCfg.Protocols.GRPC.Auth = &configauth.Authentication{
			AuthenticatorID: AuthComponentId,
		}
	}
	if portCfg.EnableSkywalkingHttp {
		recvCfg.Protocols.HTTP.Endpoint = fmt.Sprintf("0.0.0.0:%d", portCfg.SkywalkingHttpPort)
		recvCfg.Protocols.HTTP.Auth = &confighttp.AuthConfig{
			Authentication: configauth.Authentication{
				AuthenticatorID: AuthComponentId,
			},
		}
	}
	setting := createSettings()

	tracer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
		return trace_dao.TraceDao.ConsumeTraces(ctx, td)
	})
	if err != nil {
		exitChan <- err
		return
	}

	recvImpl, err := factory.CreateTraces(context.Background(), setting, recvCfg, tracer)
	if err != nil {
		exitChan <- err
		return
	}
	err = recvImpl.Start(context.Background(), &myHost{})
	if err != nil {
		exitChan <- err
		return
	}
}

func startOtlpReciver(portCfg config.PortConfig, exitChan chan<- error) {
	factory := otlpreceiver.NewFactory()
	recvCfg := factory.CreateDefaultConfig().(*otlpreceiver.Config)
	if portCfg.EnableOtlpGrpc {
		recvCfg.Protocols.GRPC.NetAddr.Endpoint = fmt.Sprintf("0.0.0.0:%d", portCfg.OtlpGrpcPort)
		recvCfg.Protocols.GRPC.Auth = &configauth.Authentication{
			AuthenticatorID: AuthComponentId,
		}
	}
	if portCfg.EnableOtlpHttp {
		recvCfg.Protocols.HTTP.Endpoint = fmt.Sprintf("0.0.0.0:%d", portCfg.OtlpHttpPort)
		recvCfg.Protocols.HTTP.Auth = &confighttp.AuthConfig{
			Authentication: configauth.Authentication{
				AuthenticatorID: AuthComponentId,
			},
		}
	}

	setting := createSettings()

	tracer, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error {
		return trace_dao.TraceDao.ConsumeTraces(ctx, td)
	})
	if err != nil {
		exitChan <- err
		return
	}

	recvImpl, err := factory.CreateTraces(context.Background(), setting, recvCfg, tracer)
	if err != nil {
		exitChan <- err
		return
	}
	err = recvImpl.Start(context.Background(), &myHost{})
	if err != nil {
		exitChan <- err
		return
	}
}

func runService(port uint16, exitChan chan<- error) {
	optionList := []grpc.ServerOption{
		grpc.StreamInterceptor(
			grpc_middleware.ChainStreamServer(
				grpc_recovery.StreamServerInterceptor(),
			)),
		grpc.UnaryInterceptor(
			grpc_middleware.ChainUnaryServer(
				grpc_recovery.UnaryServerInterceptor(),
			)),
		grpc.InitialConnWindowSize(1 << 30),
		grpc.InitialWindowSize(1 << 30),
	}
	grpcServer := grpc.NewServer(optionList...)

	config_api.RegisterConfigApiServer(grpcServer, &config_api_impl.ConfigApiImpl{})
	member_api.RegisterMemberApiServer(grpcServer, &member_api_impl.MemberApiImpl{})
	trace_api.RegisterTraceApiServer(grpcServer, &trace_api_impl.TraceApiImpl{})

	addr, err := net.ResolveTCPAddr("tcp4", fmt.Sprintf("0.0.0.0:%d", port))
	if err != nil {
		exitChan <- err
		return
	}
	li, err := net.ListenTCP("tcp4", addr)
	if err != nil {
		exitChan <- err
		return
	}
	fmt.Println("service start......")
	err = grpcServer.Serve(li)
	if err != nil {
		exitChan <- err
		return
	}
}
